--------------------------------------------------------------------------------
-- ai 0.4.1


set local search_path = pg_catalog, pg_temp;

/*
make sure that the user doing the install/upgrade is the same user who owns the
schema and migration table. abort the upgrade if different.
*/
do $bootstrap_extension$
declare
    _current_user_id oid = null;
    _schema_owner_id oid = null;
    _migration_table_owner_id oid = null;
begin
    select pg_catalog.to_regrole('@extowner@')::oid
    into strict _current_user_id;

    select pg_namespace.nspowner into strict _schema_owner_id
    from pg_catalog.pg_namespace
    where pg_namespace.nspname operator(pg_catalog.=) 'ai';

    if _schema_owner_id is null then
        -- this should NEVER happen
        -- we have `schema=ai` in the control file, so postgres creates the schema automatically
        -- but this line makes pgspot happy
        create schema ai;
    elseif _schema_owner_id operator(pg_catalog.!=) _current_user_id then
        raise exception 'only the owner of the ai schema may install/upgrade this extension';
        return;
    end if;

    select k.relowner into _migration_table_owner_id
    from pg_catalog.pg_class k
    inner join pg_catalog.pg_namespace n on (k.relnamespace = n.oid)
    where k.relname operator(pg_catalog.=) 'migration'
    and n.nspname operator(pg_catalog.=) 'ai';

    if _migration_table_owner_id is not null
    and _migration_table_owner_id is distinct from _current_user_id then
        raise exception 'only the owner of the ai.migration table can install/upgrade this extension';
        return;
    end if;

    if _migration_table_owner_id is null then
        create table ai.migration
        ( "name" text not null primary key
        , applied_at_version text not null
        , applied_at timestamptz not null default pg_catalog.clock_timestamp()
        , body text not null
        );
    end if;
end;
$bootstrap_extension$;

-- records any feature flags that were enabled when installing
-- a prerelease version of the extension
create table if not exists ai.feature_flag
( "name" text not null primary key
, applied_at_version text not null
, applied_at timestamptz not null default pg_catalog.clock_timestamp()
);


-------------------------------------------------------------------------------
-- 001-vectorizer.sql
do $outer_migration_block$ /*001-vectorizer.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$001-vectorizer.sql$migration_name$;
    _migration_body text =
$migration_body$

create table ai.vectorizer
( id int not null primary key generated by default as identity
, source_schema name not null
, source_table name not null
, source_pk jsonb not null
, target_schema name not null
, target_table name not null
, view_schema name not null
, view_name name not null
, trigger_name name not null
, queue_schema name
, queue_table name
, config jsonb not null
, unique (target_schema, target_table)
);
perform pg_catalog.pg_extension_config_dump('ai.vectorizer'::pg_catalog.regclass, '');
perform pg_catalog.pg_extension_config_dump('ai.vectorizer_id_seq'::pg_catalog.regclass, '');

create table ai.vectorizer_errors
( id int not null references ai.vectorizer (id) on delete cascade
, message text
, details jsonb
, recorded timestamptz not null default now()
);
create index on ai.vectorizer_errors (id, recorded);
perform pg_catalog.pg_extension_config_dump('ai.vectorizer'::pg_catalog.regclass, '');


$migration_body$;
begin
    select * into _migration from ai.migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$0.4.1$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 002-secret_permissions.sql
do $outer_migration_block$ /*002-secret_permissions.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$002-secret_permissions.sql$migration_name$;
    _migration_body text =
$migration_body$
create table ai._secret_permissions
( name text not null check(name = '*' or name ~ '^[A-Za-z0-9_.]+$')
, "role" text not null
, primary key (name, "role")
);
-- we add a filter to the dump config in 007-secret_permissions-dump-filter.sql
perform pg_catalog.pg_extension_config_dump('ai._secret_permissions'::pg_catalog.regclass, '');
--only admins will have access to this table
revoke all on ai._secret_permissions from public;

$migration_body$;
begin
    select * into _migration from ai.migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$0.4.1$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 003-vec-storage.sql
do $outer_migration_block$ /*003-vec-storage.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$003-vec-storage.sql$migration_name$;
    _migration_body text =
$migration_body$

-- switch the vector columns from storage external to storage main
do language plpgsql $block$
declare
    _sql pg_catalog.text;
begin
    for _sql in
    (
        select pg_catalog.format
        ( $sql$alter table %I.%I alter column embedding set storage main$sql$
        , v.target_schema
        , v.target_table
        )
        from ai.vectorizer v
        inner join pg_catalog.pg_class k on (k.relname operator(pg_catalog.=) v.target_table)
        inner join pg_catalog.pg_namespace n
            on (k.relnamespace operator(pg_catalog.=) n.oid and n.nspname operator(pg_catalog.=) v.target_schema)
        inner join pg_catalog.pg_attribute a on (k.oid operator(pg_catalog.=) a.attrelid)
        where a.attname operator(pg_catalog.=) 'embedding'
        and a.attstorage not in ('m', 'p') -- not main or plain
    )
    loop
        raise info '%', _sql;
        execute _sql;
    end loop;
end;
$block$;

$migration_body$;
begin
    select * into _migration from ai.migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$0.4.1$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 004-drop_fn_no_api_key_name.sql
do $outer_migration_block$ /*004-drop_fn_no_api_key_name.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$004-drop_fn_no_api_key_name.sql$migration_name$;
    _migration_body text =
$migration_body$
drop function if exists ai.openai_list_models(text, text);
drop function if exists ai.openai_embed(text, text,  text, text, int, text);
drop function if exists ai.openai_embed(text,integer[],text,text,integer,text);
drop function if exists ai.openai_embed(text,text[],text,text,integer,text);
drop function if exists ai.openai_chat_complete_simple(text,text);
drop function if exists ai.openai_chat_complete(text,jsonb,text,text,double precision,jsonb,boolean,integer,integer,integer,double precision,jsonb,integer,text,double precision,double precision,jsonb,jsonb,text);
drop function if exists ai.openai_moderate(text,text,text,text);
drop function if exists ai.anthropic_generate(text,jsonb,integer,text,text,double precision,integer,text,text,text[],double precision,jsonb,jsonb,integer,double precision);
drop function if exists ai.cohere_chat_complete(text,text,text,text,jsonb,text,text,jsonb,boolean,jsonb,text,double precision,integer,integer,integer,double precision,integer,text[],double precision,double precision,jsonb,jsonb,boolean);
drop function if exists ai.cohere_classify_simple(text,text[],text,jsonb,text);
drop function if exists ai.cohere_classify(text,text[],text,jsonb,text);
drop function if exists ai.cohere_detokenize(text,integer[],text);
drop function if exists ai.cohere_embed(text,text,text,text,text);
drop function if exists ai.cohere_list_models(text,text,boolean);
drop function if exists ai.cohere_rerank_simple(text,text,jsonb,text,integer,integer);
drop function if exists ai.cohere_rerank(text,text,jsonb,text,integer,text[],boolean,integer);
drop function if exists ai.cohere_tokenize(text,text,text);
drop function if exists ai.reveal_secret(text);

$migration_body$;
begin
    select * into _migration from ai.migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$0.4.1$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 005-vectorizer-queue-pending.sql
do $outer_migration_block$ /*005-vectorizer-queue-pending.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$005-vectorizer-queue-pending.sql$migration_name$;
    _migration_body text =
$migration_body$

-- we added a new parameter which changes the signature producing a new function
-- drop the old function if it exists from a prior extension version
-- we cascade drop because the ai.vectorizer_status view depends on this function
-- we'll immediate recreate the view, so we should be good
drop function if exists ai.vectorizer_queue_pending(int) cascade;

$migration_body$;
begin
    select * into _migration from ai.migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$0.4.1$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 006-drop-vectorizer.sql
do $outer_migration_block$ /*006-drop-vectorizer.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$006-drop-vectorizer.sql$migration_name$;
    _migration_body text =
$migration_body$
drop function if exists ai.drop_vectorizer(int) cascade;

$migration_body$;
begin
    select * into _migration from ai.migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$0.4.1$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 007-secret-permissions-dump-filter.sql
do $outer_migration_block$ /*007-secret-permissions-dump-filter.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$007-secret-permissions-dump-filter.sql$migration_name$;
    _migration_body text =
$migration_body$

do language plpgsql $block$
declare
    _filter text;
    _sql text;
begin
    -- two rows are inserted into the ai._secret_permissions table automatically
    -- on extension creation. these two rows should not be dumped as they cause
    -- duplicate key value violations on the pk constraint when restored
    -- the two rows are inserted on extension creation and then again on table
    -- restore. adding a filter so that they don't get dumped should fix the issue
    select pg_catalog.format
    ( $sql$where ("name", "role") not in (('*', 'pg_database_owner'), ('*', %L))$sql$
    , pg_catalog."session_user"()
    ) into strict _filter
    ;

    -- update the filter criteria on the table
    select pg_catalog.format
    ( $sql$select pg_catalog.pg_extension_config_dump('ai._secret_permissions'::pg_catalog.regclass, %L)$sql$
    , _filter
    ) into strict _sql
    ;
    execute _sql;
end;
$block$;

$migration_body$;
begin
    select * into _migration from ai.migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$0.4.1$version$);
end;
$outer_migration_block$;

--------------------------------------------------------------------------------
-- 001-openai.sql

-------------------------------------------------------------------------------
-- openai_tokenize
-- encode text as tokens for a given model
-- https://github.com/openai/tiktoken/blob/main/README.md
create or replace function ai.openai_tokenize(model text, text_input text) returns int[]
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import tiktoken
    encoding = tiktoken.encoding_for_model(model)
    tokens = encoding.encode(text_input)
    return tokens
$python$
language plpython3u strict immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- openai_detokenize
-- decode tokens for a given model back into text
-- https://github.com/openai/tiktoken/blob/main/README.md
create or replace function ai.openai_detokenize(model text, tokens int[]) returns text
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import tiktoken
    encoding = tiktoken.encoding_for_model(model)
    content = encoding.decode(tokens)
    return content
$python$
language plpython3u strict immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- openai_list_models
-- list models supported on the openai platform
-- https://platform.openai.com/docs/api-reference/models/list
create or replace function ai.openai_list_models(api_key text default null, api_key_name text default null, base_url text default null)
returns table
( id text
, created timestamptz
, owned_by text
)
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.openai
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.openai.DEFAULT_KEY_NAME, SD)
    for tup in ai.openai.list_models(plpy, api_key_resolved, base_url):
        yield tup
$python$
language plpython3u volatile parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- openai_embed
-- generate an embedding from a text value
-- https://platform.openai.com/docs/api-reference/embeddings/create
create or replace function ai.openai_embed
( model text
, input_text text
, api_key text default null
, api_key_name text default null
, base_url text default null
, dimensions int default null
, openai_user text default null
) returns @extschema:vector@.vector
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.openai
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.openai.DEFAULT_KEY_NAME, SD)
    for tup in ai.openai.embed(plpy, model, input_text, api_key=api_key_resolved, base_url=base_url, dimensions=dimensions, user=openai_user):
        return tup[1]
$python$
language plpython3u immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- openai_embed
-- generate embeddings from an array of text values
-- https://platform.openai.com/docs/api-reference/embeddings/create
create or replace function ai.openai_embed
( model text
, input_texts text[]
, api_key text default null
, api_key_name text default null
, base_url text default null
, dimensions int default null
, openai_user text default null
) returns table
( "index" int
, embedding @extschema:vector@.vector
)
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.openai
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.openai.DEFAULT_KEY_NAME, SD)
    for tup in ai.openai.embed(plpy, model, input_texts, api_key=api_key_resolved, base_url=base_url, dimensions=dimensions, user=openai_user):
        yield tup
$python$
language plpython3u immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- openai_embed
-- generate embeddings from an array of tokens
-- https://platform.openai.com/docs/api-reference/embeddings/create
create or replace function ai.openai_embed
( model text
, input_tokens int[]
, api_key text default null
, api_key_name text default null
, base_url text default null
, dimensions int default null
, openai_user text default null
) returns @extschema:vector@.vector
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.openai
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.openai.DEFAULT_KEY_NAME, SD)
    for tup in ai.openai.embed(plpy, model, input_tokens, api_key=api_key_resolved, base_url=base_url, dimensions=dimensions, user=openai_user):
        return tup[1]
$python$
language plpython3u immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- openai_chat_complete
-- text generation / chat completion
-- https://platform.openai.com/docs/api-reference/chat/create
create or replace function ai.openai_chat_complete
( model text
, messages jsonb
, api_key text default null
, api_key_name text default null
, base_url text default null
, frequency_penalty float8 default null
, logit_bias jsonb default null
, logprobs boolean default null
, top_logprobs int default null
, max_tokens int default null
, n int default null
, presence_penalty float8 default null
, response_format jsonb default null
, seed int default null
, stop text default null
, temperature float8 default null
, top_p float8 default null
, tools jsonb default null
, tool_choice jsonb default null
, openai_user text default null
) returns jsonb
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.openai
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.openai.DEFAULT_KEY_NAME, SD)
    client = ai.openai.make_client(plpy, api_key_resolved, base_url)
    import json

    messages_1 = json.loads(messages)
    if not isinstance(messages_1, list):
        plpy.error("messages is not an array")

    logit_bias_1 = None
    if logit_bias is not None:
      logit_bias_1 = json.loads(logit_bias)

    response_format_1 = None
    if response_format is not None:
      response_format_1 = json.loads(response_format)

    tools_1 = None
    if tools is not None:
      tools_1 = json.loads(tools)

    tool_choice_1 = None
    if tool_choice is not None:
      tool_choice_1 = json.loads(tool_choice)

    response = client.chat.completions.create(
      model=model
    , messages=messages_1
    , frequency_penalty=frequency_penalty
    , logit_bias=logit_bias_1
    , logprobs=logprobs
    , top_logprobs=top_logprobs
    , max_tokens=max_tokens
    , n=n
    , presence_penalty=presence_penalty
    , response_format=response_format_1
    , seed=seed
    , stop=stop
    , stream=False
    , temperature=temperature
    , top_p=top_p
    , tools=tools_1
    , tool_choice=tool_choice_1
    , user=openai_user
    )

    return response.model_dump_json()
$python$
language plpython3u volatile parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

------------------------------------------------------------------------------------
-- openai_chat_complete_simple
-- simple chat completion that only requires a message and only returns the response
create or replace function ai.openai_chat_complete_simple
( message text
, api_key text default null
, api_key_name text default null
) returns text
as $$
declare
    model text := 'gpt-4o';
    messages jsonb;
begin
    messages := pg_catalog.jsonb_build_array(
        pg_catalog.jsonb_build_object('role', 'system', 'content', 'you are a helpful assistant'),
        pg_catalog.jsonb_build_object('role', 'user', 'content', message)
    );
    return ai.openai_chat_complete(model, messages, api_key, api_key_name)
        operator(pg_catalog.->)'choices'
        operator(pg_catalog.->)0
        operator(pg_catalog.->)'message'
        operator(pg_catalog.->>)'content';
end;
$$ language plpgsql volatile parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- openai_moderate
-- classify text as potentially harmful or not
-- https://platform.openai.com/docs/api-reference/moderations/create
create or replace function ai.openai_moderate
( model text
, input_text text
, api_key text default null
, api_key_name text default null
, base_url text default null
) returns jsonb
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.openai
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.openai.DEFAULT_KEY_NAME, SD)
    client = ai.openai.make_client(plpy, api_key_resolved, base_url)
    moderation = client.moderations.create(input=input_text, model=model)
    return moderation.model_dump_json()
$python$
language plpython3u immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 002-ollama.sql

-------------------------------------------------------------------------------
-- ollama_list_models
-- https://github.com/ollama/ollama/blob/main/docs/api.md#list-local-models
--
create or replace function ai.ollama_list_models(host text default null)
returns table
( "name" text
, model text
, size bigint
, digest text
, family text
, format text
, families jsonb
, parent_model text
, parameter_size text
, quantization_level text
, modified_at timestamptz
)
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.ollama
    client = ai.ollama.make_client(plpy, host)
    import json
    resp = client.list()
    models = resp.get("models")
    if models is None:
        raise StopIteration
    for m in models:
        d = m.get("details")
        yield ( m.get("name")
            , m.get("model")
            , m.get("size")
            , m.get("digest")
            , d.get("family") if d is not None else None
            , d.get("format") if d is not None else None
            , json.dumps(d.get("families")) if d is not None else None
            , d.get("parent_model") if d is not None else None
            , d.get("parameter_size") if d is not None else None
            , d.get("quantization_level") if d is not None else None
            , m.get("modified_at")
        )
$python$
language plpython3u volatile parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- ollama_ps
-- https://github.com/ollama/ollama/blob/main/docs/api.md#list-running-models
create or replace function ai.ollama_ps(host text default null)
returns table
( "name" text
, model text
, size bigint
, digest text
, parent_model text
, format text
, family text
, families jsonb
, parameter_size text
, quantization_level text
, expires_at timestamptz
, size_vram bigint
)
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.ollama
    client = ai.ollama.make_client(plpy, host)
    import json
    resp = client.ps()
    models = resp.get("models")
    if models is None:
        raise StopIteration
    for m in models:
        d = m.get("details")
        yield ( m.get("name")
            , m.get("model")
            , m.get("size")
            , m.get("digest")
            , d.get("parent_model") if d is not None else None
            , d.get("format") if d is not None else None
            , d.get("family") if d is not None else None
            , json.dumps(d.get("families")) if d is not None else None
            , d.get("parameter_size") if d is not None else None
            , d.get("quantization_level") if d is not None else None
            , m.get("expires_at")
            , m.get("size_vram")
        )
$python$
language plpython3u volatile parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- ollama_embed
-- https://github.com/ollama/ollama/blob/main/docs/api.md#generate-embeddings
create or replace function ai.ollama_embed
( model text
, input_text text
, host text default null
, keep_alive float8 default null
, embedding_options jsonb default null
) returns @extschema:vector@.vector
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.ollama
    client = ai.ollama.make_client(plpy, host)
    embedding_options_1 = None
    if embedding_options is not None:
        import json
        embedding_options_1 = {k: v for k, v in json.loads(embedding_options).items()}
    resp = client.embeddings(model, input_text, options=embedding_options_1, keep_alive=keep_alive)
    return resp.get("embedding")
$python$
language plpython3u immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- ollama_generate
-- https://github.com/ollama/ollama/blob/main/docs/api.md#generate-a-completion
create or replace function ai.ollama_generate
( model text
, prompt text
, host text default null
, images bytea[] default null
, keep_alive float8 default null
, embedding_options jsonb default null
, system_prompt text default null
, template text default null
, context int[] default null
) returns jsonb
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.ollama
    client = ai.ollama.make_client(plpy, host)

    import json
    args = {}

    if keep_alive is not None:
        args["keep_alive"] = keep_alive

    if embedding_options is not None:
        args["options"] = {k: v for k, v in json.loads(embedding_options).items()}

    if system_prompt is not None:
        args["system"] = system_prompt

    if template is not None:
        args["template"] = template

    if context is not None:
        args["context"] = context

    if images is not None:
        import base64
        images_1 = []
        for image in images:
            images_1.append(base64.b64encode(image).decode('utf-8'))
        args["images"] = images_1

    resp = client.generate(model, prompt, stream=False, **args)
    return json.dumps(resp)
$python$
language plpython3u volatile parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- ollama_chat_complete
-- https://github.com/ollama/ollama/blob/main/docs/api.md#generate-a-chat-completion
create or replace function ai.ollama_chat_complete
( model text
, messages jsonb
, host text default null
, keep_alive float8 default null
, chat_options jsonb default null
) returns jsonb
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.ollama
    client = ai.ollama.make_client(plpy, host)

    import json
    import base64
    args = {}

    if keep_alive is not None:
        args["keep_alive"] = keep_alive

    if chat_options is not None:
        args["options"] = {k: v for k, v in json.loads(chat_options).items()}

    messages_1 = json.loads(messages)
    if not isinstance(messages_1, list):
        plpy.error("messages is not an array")

    # the python api expects bytes objects for images
    # decode the base64 encoded images into raw binary
    for message in messages_1:
        if 'images' in message:
            decoded = [base64.b64decode(image) for image in message["images"]]
            message["images"] = decoded

    resp = client.chat(model, messages_1, stream=False, **args)

    return json.dumps(resp)
$python$
language plpython3u volatile parallel safe security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 003-anthropic.sql
-------------------------------------------------------------------------------
-- anthropic_generate
-- https://docs.anthropic.com/en/api/messages
create or replace function ai.anthropic_generate
( model text
, messages jsonb
, max_tokens int default 1024
, api_key text default null
, api_key_name text default null
, base_url text default null
, timeout float8 default null
, max_retries int default null
, system_prompt text default null
, user_id text default null
, stop_sequences text[] default null
, temperature float8 default null
, tool_choice jsonb default null
, tools jsonb default null
, top_k int default null
, top_p float8 default null
) returns jsonb
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.anthropic
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.anthropic.DEFAULT_KEY_NAME, SD)
    client = ai.anthropic.make_client(api_key=api_key_resolved, base_url=base_url, timeout=timeout, max_retries=max_retries)

    import json
    messages_1 = json.loads(messages)

    args = {}
    if system_prompt is not None:
        args["system"] = system_prompt
    if user_id is not None:
        args["metadata"] = {"user_id", user_id}
    if stop_sequences is not None:
        args["stop_sequences"] = stop_sequences
    if temperature is not None:
        args["temperature"] = temperature
    if tool_choice is not None:
        args["tool_choice"] = json.dumps(tool_choice)
    if tools is not None:
        args["tools"] = json.dumps(tools)
    if top_k is not None:
        args["top_k"] = top_k
    if top_p is not None:
        args["top_p"] = top_p

    message = client.messages.create(model=model, messages=messages_1, max_tokens=max_tokens, **args)
    return message.to_json()
$python$
language plpython3u volatile parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

--------------------------------------------------------------------------------
-- 004-cohere.sql
-------------------------------------------------------------------------------
-- cohere_list_models
-- https://docs.cohere.com/reference/list-models
create or replace function ai.cohere_list_models
( api_key text default null
, api_key_name text default null
, endpoint text default null
, default_only bool default null
)
returns table
( "name" text
, endpoints text[]
, finetuned bool
, context_length int
, tokenizer_url text
, default_endpoints text[]
)
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.cohere
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.cohere.DEFAULT_KEY_NAME, SD)
    client = ai.cohere.make_client(api_key_resolved)

    args = {}
    if endpoint is not None:
        args["endpoint"] = endpoint
    if default_only is not None:
        args["default_only"] = default_only
    page_token = None
    while True:
        resp = client.models.list(page_size=1000, page_token=page_token, **args)
        for model in resp.models:
            yield (model.name, model.endpoints, model.finetuned, model.context_length, model.tokenizer_url, model.default_endpoints)
        page_token = resp.next_page_token
        if page_token is None:
            break
$python$
language plpython3u volatile parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- cohere_tokenize
-- https://docs.cohere.com/reference/tokenize
create or replace function ai.cohere_tokenize(model text, text_input text, api_key text default null, api_key_name text default null) returns int[]
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.cohere
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.cohere.DEFAULT_KEY_NAME, SD)
    client = ai.cohere.make_client(api_key_resolved)

    response = client.tokenize(text=text_input, model=model)
    return response.tokens
$python$
language plpython3u immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- cohere_detokenize
-- https://docs.cohere.com/reference/detokenize
create or replace function ai.cohere_detokenize(model text, tokens int[], api_key text default null, api_key_name text default null) returns text
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.cohere
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.cohere.DEFAULT_KEY_NAME, SD)
    client = ai.cohere.make_client(api_key_resolved)

    response = client.detokenize(tokens=tokens, model=model)
    return response.text
$python$
language plpython3u immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- cohere_embed
-- https://docs.cohere.com/reference/embed-1
create or replace function ai.cohere_embed
( model text
, input_text text
, api_key text default null
, api_key_name text default null
, input_type text default null
, truncate_long_inputs text default null
) returns @extschema:vector@.vector
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.cohere
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.cohere.DEFAULT_KEY_NAME, SD)
    client = ai.cohere.make_client(api_key_resolved)

    args={}
    if input_type is not None:
        args["input_type"] = input_type
    if truncate_long_inputs is not None:
        args["truncate"] = truncate_long_inputs
    response = client.embed(texts=[input_text], model=model, **args)
    return response.embeddings[0]
$python$
language plpython3u immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- cohere_classify
-- https://docs.cohere.com/reference/classify
create or replace function ai.cohere_classify
( model text
, inputs text[]
, api_key text default null
, api_key_name text default null
, examples jsonb default null
, truncate_long_inputs text default null
) returns jsonb
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.cohere
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.cohere.DEFAULT_KEY_NAME, SD)
    client = ai.cohere.make_client(api_key_resolved)

    import json
    args = {}
    if examples is not None:
        args["examples"] = json.loads(examples)
    if truncate_long_inputs is not None:
        args["truncate"] = truncate_long_inputs

    response = client.classify(inputs=inputs, model=model, **args)
    return response.json()
$python$
language plpython3u immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- cohere_classify_simple
-- https://docs.cohere.com/reference/classify
create or replace function ai.cohere_classify_simple
( model text
, inputs text[]
, api_key text default null
, api_key_name text default null
, examples jsonb default null
, truncate_long_inputs text default null
) returns table
( input text
, prediction text
, confidence float8
)
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.cohere
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.cohere.DEFAULT_KEY_NAME, SD)
    client = ai.cohere.make_client(api_key_resolved)

    import json
    args = {}
    if examples is not None:
        args["examples"] = json.loads(examples)
    if truncate_long_inputs is not None:
        args["truncate"] = truncate_long_inputs
    response = client.classify(inputs=inputs, model=model, **args)
    for x in response.classifications:
        yield x.input, x.prediction, x.confidence
$python$
language plpython3u immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- cohere_rerank
-- https://docs.cohere.com/reference/rerank
create or replace function ai.cohere_rerank
( model text
, query text
, documents jsonb
, api_key text default null
, api_key_name text default null
, top_n integer default null
, rank_fields text[] default null
, return_documents bool default null
, max_chunks_per_doc int default null
) returns jsonb
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.cohere
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.cohere.DEFAULT_KEY_NAME, SD)
    client = ai.cohere.make_client(api_key_resolved)

    import json
    args = {}
    if top_n is not None:
        args["top_n"] = top_n
    if rank_fields is not None:
        args["rank_fields"] = rank_fields
    if return_documents is not None:
        args["return_documents"] = return_documents
    if max_chunks_per_doc is not None:
        args["max_chunks_per_doc"] = max_chunks_per_doc
    documents_1 = json.loads(documents)
    response = client.rerank(model=model, query=query, documents=documents_1, **args)
    return response.json()
$python$ language plpython3u immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- cohere_rerank_simple
-- https://docs.cohere.com/reference/rerank
create or replace function ai.cohere_rerank_simple
( model text
, query text
, documents jsonb
, api_key text default null
, api_key_name text default null
, top_n integer default null
, max_chunks_per_doc int default null
) returns table
( "index" int
, "document" jsonb
, relevance_score float8
)
as $func$
select *
from pg_catalog.jsonb_to_recordset
(
    ai.cohere_rerank
    ( model
    , query
    , documents
    , api_key=>api_key
    , api_key_name=>api_key_name
    , top_n=>top_n
    , return_documents=>true
    , max_chunks_per_doc=>max_chunks_per_doc
    ) operator(pg_catalog.->) 'results'
) x("index" int, "document" jsonb, relevance_score float8)
$func$ language sql immutable parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- cohere_chat_complete
-- https://docs.cohere.com/reference/chat
create or replace function ai.cohere_chat_complete
( model text
, message text
, api_key text default null
, api_key_name text default null
, preamble text default null
, chat_history jsonb default null
, conversation_id text default null
, prompt_truncation text default null
, connectors jsonb default null
, search_queries_only bool default null
, documents jsonb default null
, citation_quality text default null
, temperature float8 default null
, max_tokens int default null
, max_input_tokens int default null
, k int default null
, p float8 default null
, seed int default null
, stop_sequences text[] default null
, frequency_penalty float8 default null
, presence_penalty float8 default null
, tools jsonb default null
, tool_results jsonb default null
, force_single_step bool default null
) returns jsonb
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.cohere
    import ai.secrets
    api_key_resolved = ai.secrets.get_secret(plpy, api_key, api_key_name, ai.cohere.DEFAULT_KEY_NAME, SD)
    client = ai.cohere.make_client(api_key_resolved)

    import json
    args = {}
    if preamble is not None:
        args["preamble"] = preamble
    if chat_history is not None:
        args["chat_history"] = json.loads(chat_history)
    if conversation_id is not None:
        args["conversation_id"] = conversation_id
    if prompt_truncation is not None:
        args["prompt_truncation"] = prompt_truncation
    if connectors is not None:
        args["connectors"] = json.loads(connectors)
    if search_queries_only is not None:
        args["search_queries_only"] = search_queries_only
    if documents is not None:
        args["documents"] = json.loads(documents)
    if citation_quality is not None:
        args["citation_quality"] = citation_quality
    if temperature is not None:
        args["temperature"] = temperature
    if max_tokens is not None:
        args["max_tokens"] = max_tokens
    if max_input_tokens is not None:
        args["max_input_tokens"] = max_input_tokens
    if k is not None:
        args["k"] = k
    if p is not None:
        args["p"] = p
    if seed is not None:
        args["seed"] = seed
    if stop_sequences is not None:
        args["stop_sequences"] = stop_sequences
    if frequency_penalty is not None:
        args["frequency_penalty"] = frequency_penalty
    if presence_penalty is not None:
        args["presence_penalty"] = presence_penalty
    if tools is not None:
        args["tools"] = json.loads(tools)
    if tool_results is not None:
        args["tool_results"] = json.loads(tool_results)
    if force_single_step is not None:
        args["force_single_step"] = force_single_step

    response = client.chat(model=model, message=message, **args)
    return response.json()
$python$ language plpython3u volatile parallel safe security invoker
set search_path to pg_catalog, pg_temp
;

--------------------------------------------------------------------------------
-- 005-chunking.sql

-------------------------------------------------------------------------------
-- chunking_character_text_splitter
create or replace function ai.chunking_character_text_splitter
( chunk_column name
, chunk_size int default 800
, chunk_overlap int default 400
, separator text default E'\n\n'
, is_separator_regex bool default false
) returns jsonb
as $func$
    select json_object
    ( 'implementation': 'character_text_splitter'
    , 'config_type': 'chunking'
    , 'chunk_column': chunk_column
    , 'chunk_size': chunk_size
    , 'chunk_overlap': chunk_overlap
    , 'separator': separator
    , 'is_separator_regex': is_separator_regex
    absent on null
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- chunking_recursive_character_text_splitter
create or replace function ai.chunking_recursive_character_text_splitter
( chunk_column name
, chunk_size int default 800
, chunk_overlap int default 400
, separators text[] default array[E'\n\n', E'\n', '.', '?', '!', ' ', '']
, is_separator_regex bool default false
) returns jsonb
as $func$
    select json_object
    ( 'implementation': 'recursive_character_text_splitter'
    , 'config_type': 'chunking'
    , 'chunk_column': chunk_column
    , 'chunk_size': chunk_size
    , 'chunk_overlap': chunk_overlap
    , 'separators': separators
    , 'is_separator_regex': is_separator_regex
    absent on null
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_chunking
create or replace function ai._validate_chunking
( config jsonb
, source_schema name
, source_table name
) returns void
as $func$
declare
    _config_type text;
    _implementation text;
    _chunk_column text;
    _found bool;
begin
    if pg_catalog.jsonb_typeof(config) != 'object' then
        raise exception 'chunking config is not a jsonb object';
    end if;

    _config_type = config operator(pg_catalog.->>) 'config_type';
    if _config_type is null or _config_type != 'chunking' then
        raise exception 'invalid config_type for chunking config';
    end if;

    _implementation = config operator(pg_catalog.->>) 'implementation';
    if _implementation is null or _implementation not in ('character_text_splitter', 'recursive_character_text_splitter') then
        raise exception 'invalid chunking config implementation';
    end if;

    _chunk_column = config operator(pg_catalog.->>) 'chunk_column';

    select count(*) > 0 into strict _found
    from pg_catalog.pg_class k
    inner join pg_catalog.pg_namespace n on (k.relnamespace operator(pg_catalog.=) n.oid)
    inner join pg_catalog.pg_attribute a on (k.oid operator(pg_catalog.=) a.attrelid)
    inner join pg_catalog.pg_type y on (a.atttypid operator(pg_catalog.=) y.oid)
    where n.nspname operator(pg_catalog.=) source_schema
    and k.relname operator(pg_catalog.=) source_table
    and a.attnum operator(pg_catalog.>) 0
    and a.attname operator(pg_catalog.=) _chunk_column
    and y.typname in ('text', 'varchar', 'char', 'bpchar')
    ;
    if not _found then
        raise exception 'chunk column in config does not exist in the table: %', _chunk_column;
    end if;
end
$func$ language plpgsql stable security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 006-formatting.sql

-------------------------------------------------------------------------------
-- formatting_python_template
create or replace function ai.formatting_python_template(template text default '$chunk') returns jsonb
as $func$
    select json_object
    ( 'implementation': 'python_template'
    , 'config_type': 'formatting'
    , 'template': template
    absent on null
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_formatting_python_template
create or replace function ai._validate_formatting_python_template
( config jsonb
, source_schema name
, source_table name
) returns void
as $func$
declare
    _template text;
    _found bool;
begin
    select config operator(pg_catalog.->>) 'template'
    into strict _template
    ;
    if not pg_catalog.like(_template, '%$chunk%') then
        raise exception 'template must contain $chunk placeholder';
    end if;

    -- check that no columns on the source table are named "chunk"
    select count(*) > 0 into strict _found
    from pg_catalog.pg_class k
    inner join pg_catalog.pg_namespace n on (k.relnamespace = n.oid)
    inner join pg_catalog.pg_attribute a on (k.oid = a.attrelid)
    where n.nspname operator(pg_catalog.=) source_schema
    and k.relname operator(pg_catalog.=) source_table
    and a.attnum operator(pg_catalog.>) 0
    and a.attname operator(pg_catalog.=) 'chunk'
    ;
    if _found then
        raise exception 'formatting_python_template may not be used when source table has a column named "chunk"';
    end if;
end
$func$ language plpgsql stable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_formatting
create or replace function ai._validate_formatting
( config jsonb
, source_schema name
, source_table name
) returns void
as $func$
declare
    _config_type text;
begin
    if pg_catalog.jsonb_typeof(config) != 'object' then
        raise exception 'formatting config is not a jsonb object';
    end if;

    _config_type = config operator ( pg_catalog.->> ) 'config_type';
    if _config_type is null or _config_type != 'formatting' then
        raise exception 'invalid config_type for formatting config';
    end if;
    case config operator(pg_catalog.->>) 'implementation'
        when 'python_template' then
            perform ai._validate_formatting_python_template
            ( config
            , source_schema
            , source_table
            );
        else
            raise exception 'unrecognized formatting implementation';
    end case;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 007-scheduling.sql

-------------------------------------------------------------------------------
-- scheduling_none
create or replace function ai.scheduling_none() returns jsonb
as $func$
    select pg_catalog.jsonb_build_object
    ( 'implementation', 'none'
    , 'config_type', 'scheduling'
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- scheduling_default
create or replace function ai.scheduling_default() returns jsonb
as $func$
    select pg_catalog.jsonb_build_object
    ( 'implementation', 'default'
    , 'config_type', 'scheduling'
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- scheduling_timescaledb
create or replace function ai.scheduling_timescaledb
( schedule_interval interval default interval '5m'
, initial_start timestamptz default null
, fixed_schedule bool default null
, timezone text default null
) returns jsonb
as $func$
    select json_object
    ( 'implementation': 'timescaledb'
    , 'config_type': 'scheduling'
    , 'schedule_interval': schedule_interval
    , 'initial_start': initial_start
    , 'fixed_schedule': fixed_schedule
    , 'timezone': timezone
    absent on null
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _resolve_scheduling_default
create or replace function ai._resolve_scheduling_default() returns jsonb
as $func$
declare
    _setting text;
begin
    select pg_catalog.current_setting('ai.scheduling_default', true) into _setting;
    case _setting
        when 'scheduling_timescaledb' then
            return ai.scheduling_timescaledb();
        else
            return ai.scheduling_none();
    end case;
end;
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_scheduling
create or replace function ai._validate_scheduling(config jsonb) returns void
as $func$
declare
    _config_type text;
    _implementation text;
begin
    if pg_catalog.jsonb_typeof(config) != 'object' then
        raise exception 'scheduling config is not a jsonb object';
    end if;

    _config_type = config operator ( pg_catalog.->> ) 'config_type';
    if _config_type is null or _config_type != 'scheduling' then
        raise exception 'invalid config_type for scheduling config';
    end if;
    _implementation = config operator(pg_catalog.->>) 'implementation';
    case _implementation
        when 'none' then
            -- ok
        when 'timescaledb' then
            -- ok
        else
            if _implementation is null then
                raise exception 'scheduling implementation not specified';
            else
                raise exception 'unrecognized scheduling implementation: "%"', _implementation;
            end if;
    end case;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 008-embedding.sql

-------------------------------------------------------------------------------
-- embedding_openai
create or replace function ai.embedding_openai
( model text
, dimensions int
, chat_user text default null
, api_key_name text default 'OPENAI_API_KEY'
) returns jsonb
as $func$
    select json_object
    ( 'implementation': 'openai'
    , 'config_type': 'embedding'
    , 'model': model
    , 'dimensions': dimensions
    , 'user': chat_user
    , 'api_key_name': api_key_name
    absent on null
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_embedding
create or replace function ai._validate_embedding(config jsonb) returns void
as $func$
declare
    _config_type text;
    _implementation text;
begin
    if pg_catalog.jsonb_typeof(config) != 'object' then
        raise exception 'embedding config is not a jsonb object';
    end if;

    _config_type = config operator ( pg_catalog.->> ) 'config_type';
    if _config_type is null or _config_type != 'embedding' then
        raise exception 'invalid config_type for embedding config';
    end if;
    _implementation = config operator(pg_catalog.->>) 'implementation';
    case _implementation
        when 'openai' then
            -- ok
        else
            if _implementation is null then
                raise exception 'embedding implementation not specified';
            else
                raise exception 'invalid embedding implementation: "%"', _implementation;
            end if;
    end case;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 009-indexing.sql

-------------------------------------------------------------------------------
-- indexing_none
create or replace function ai.indexing_none() returns jsonb
as $func$
    select jsonb_build_object
    ( 'implementation', 'none'
    , 'config_type', 'indexing'
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- indexing_default
create or replace function ai.indexing_default() returns jsonb
as $func$
    select jsonb_build_object
    ( 'implementation', 'default'
    , 'config_type', 'indexing'
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- indexing_diskann
create or replace function ai.indexing_diskann
( min_rows int default 100000
, storage_layout text default null
, num_neighbors int default null
, search_list_size int default null
, max_alpha float8 default null
, num_dimensions int default null
, num_bits_per_dimension int default null
, create_when_queue_empty boolean default true
) returns jsonb
as $func$
    select json_object
    ( 'implementation': 'diskann'
    , 'config_type': 'indexing'
    , 'min_rows': min_rows
    , 'storage_layout': storage_layout
    , 'num_neighbors': num_neighbors
    , 'search_list_size': search_list_size
    , 'max_alpha': max_alpha
    , 'num_dimensions': num_dimensions
    , 'num_bits_per_dimension': num_bits_per_dimension
    , 'create_when_queue_empty': create_when_queue_empty
    absent on null
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _resolve_indexing_default
create or replace function ai._resolve_indexing_default() returns jsonb
as $func$
declare
    _setting text;
begin
    select pg_catalog.current_setting('ai.indexing_default', true) into _setting;
    case _setting
        when 'indexing_diskann' then
            return ai.indexing_diskann();
        when 'indexing_hnsw' then
            return ai.indexing_hnsw();
        else
            return ai.indexing_none();
    end case;
end;
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_indexing_diskann
create or replace function ai._validate_indexing_diskann(config jsonb) returns void
as $func$
declare
    _storage_layout text;
begin
    _storage_layout = config operator(pg_catalog.->>) 'storage_layout';
    if _storage_layout is not null and not (_storage_layout operator(pg_catalog.=) any(array['memory_optimized', 'plain'])) then
        raise exception 'invalid storage_layout';
    end if;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- indexing_hnsw
create or replace function ai.indexing_hnsw
( min_rows int default 100000
, opclass text default 'vector_cosine_ops'
, m int default null
, ef_construction int default null
, create_when_queue_empty boolean default true
) returns jsonb
as $func$
    select json_object
    ( 'implementation': 'hnsw'
    , 'config_type': 'indexing'
    , 'min_rows': min_rows
    , 'opclass': opclass
    , 'm': m
    , 'ef_construction': ef_construction
    , 'create_when_queue_empty': create_when_queue_empty
    absent on null
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_indexing_hnsw
create or replace function ai._validate_indexing_hnsw(config jsonb) returns void
as $func$
declare
    _opclass text;
begin
    _opclass = config operator(pg_catalog.->>) 'opclass';
    if _opclass is not null
    and not (_opclass operator(pg_catalog.=) any(array['vector_ip_ops', 'vector_cosine_ops', 'vector_l1_ops'])) then
        raise exception 'invalid opclass';
    end if;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_indexing
create or replace function ai._validate_indexing(config jsonb) returns void
as $func$
declare
    _config_type text;
    _implementation text;
begin
    if pg_catalog.jsonb_typeof(config) != 'object' then
        raise exception 'indexing config is not a jsonb object';
    end if;

    _config_type = config operator ( pg_catalog.->> ) 'config_type';
    if _config_type is null or _config_type != 'indexing' then
        raise exception 'invalid config_type for indexing config';
    end if;
    _implementation = config operator(pg_catalog.->>) 'implementation';
    case _implementation
        when 'none' then
            -- ok
        when 'diskann' then
            perform ai._validate_indexing_diskann(config);
        when 'hnsw' then
            perform ai._validate_indexing_hnsw(config);
        else
            if _implementation is null then
                raise exception 'indexing implementation not specified';
            else
                raise exception 'invalid indexing implementation: "%"', _implementation;
            end if;
    end case;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;



--------------------------------------------------------------------------------
-- 010-processing.sql

-------------------------------------------------------------------------------
-- processing_default
create or replace function ai.processing_default
( batch_size int default null
, concurrency int default null
) returns jsonb
as $func$
    select json_object
    ( 'implementation': 'default'
    , 'config_type': 'processing'
    , 'batch_size': batch_size
    , 'concurrency': concurrency
    absent on null
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_processing
create or replace function ai._validate_processing(config jsonb) returns void
as $func$
declare
    _config_type text;
    _implementation text;
    _val jsonb;
begin
    if pg_catalog.jsonb_typeof(config) != 'object' then
        raise exception 'processing config is not a jsonb object';
    end if;

    _config_type = config operator ( pg_catalog.->> ) 'config_type';
    if _config_type is null or _config_type != 'processing' then
        raise exception 'invalid config_type for processing config';
    end if;
    _implementation = config operator(pg_catalog.->>) 'implementation';
    case _implementation
        when 'default' then
            _val = pg_catalog.jsonb_extract_path(config, 'batch_size');
            if _val is not null then
                if pg_catalog.jsonb_typeof(_val) operator(pg_catalog.!=) 'number' then
                    raise exception 'batch_size must be a number';
                end if;
                if cast(_val as int) > 2048 then
                    raise exception 'batch_size must be less than or equal to 2048';
                end if;
                if cast(_val as int) < 1 then
                    raise exception 'batch_size must be greater than 0';
                end if;
            end if;

            _val = pg_catalog.jsonb_extract_path(config, 'concurrency');
            if _val is not null then
                if pg_catalog.jsonb_typeof(_val) operator(pg_catalog.!=) 'number' then
                    raise exception 'concurrency must be a number';
                end if;
                if cast(_val as int) > 50 then
                    raise exception 'concurrency must be less than or equal to 50';
                end if;
                if cast(_val as int) < 1 then
                    raise exception 'concurrency must be greater than 0';
                end if;
            end if;
        else
            if _implementation is null then
                raise exception 'processing implementation not specified';
            else
                raise exception 'unrecognized processing implementation: "%"', _implementation;
            end if;
    end case;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 011-grant-to.sql
-------------------------------------------------------------------------------
-- grant_to
create or replace function ai.grant_to(variadic grantees name[]) returns name[]
as $func$
    select coalesce(pg_catalog.array_agg(cast(x as name)), array[]::name[])
    from (
        select pg_catalog.unnest(grantees) x
        union
        select trim(pg_catalog.string_to_table(pg_catalog.current_setting('ai.grant_to_default', true), ',')) x
    ) _;
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- grant_to
create or replace function ai.grant_to() returns name[]
as $func$
    select ai.grant_to(variadic array[]::name[])
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 012-vectorizer-int.sql

-------------------------------------------------------------------------------
-- _vectorizer_source_pk
create or replace function ai._vectorizer_source_pk(source_table regclass) returns jsonb as
$func$
    select pg_catalog.jsonb_agg(x)
    from
    (
        select e.attnum, e.pknum, a.attname, y.typname
        from pg_catalog.pg_constraint k
        cross join lateral pg_catalog.unnest(k.conkey) with ordinality e(attnum, pknum)
        inner join pg_catalog.pg_attribute a
            on (k.conrelid operator(pg_catalog.=) a.attrelid
                and e.attnum operator(pg_catalog.=) a.attnum)
        inner join pg_catalog.pg_type y on (a.atttypid operator(pg_catalog.=) y.oid)
        where k.conrelid operator(pg_catalog.=) source_table
        and k.contype operator(pg_catalog.=) 'p'
    ) x
$func$
language sql stable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_grant_to_source
create or replace function ai._vectorizer_grant_to_source
( source_schema name
, source_table name
, grant_to name[]
) returns void as
$func$
declare
    _sql text;
begin
    if grant_to is not null then
        -- grant usage on source schema to grant_to roles
        select pg_catalog.format
        ( $sql$grant usage on schema %I to %s$sql$
        , source_schema
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;

        -- grant select on source table to grant_to roles
        select pg_catalog.format
        ( $sql$grant select on %I.%I to %s$sql$
        , source_schema
        , source_table
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;
    end if;
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_grant_to_vectorizer
create or replace function ai._vectorizer_grant_to_vectorizer(grant_to name[]) returns void as
$func$
declare
    _sql text;
begin
    if grant_to is not null then
        -- grant usage on schema ai to grant_to roles
        select pg_catalog.format
        ( $sql$grant usage on schema ai to %s$sql$
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;

        -- grant select on vectorizer table to grant_to roles
        select pg_catalog.format
        ( $sql$grant select on ai.vectorizer to %s$sql$
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;
    end if;
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_create_target_table
create or replace function ai._vectorizer_create_target_table
( source_schema name
, source_table name
, source_pk jsonb
, target_schema name
, target_table name
, dimensions int
, grant_to name[]
) returns void as
$func$
declare
    _pk_cols text;
    _sql text;
begin
    select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.pknum)
    into strict _pk_cols
    from pg_catalog.jsonb_to_recordset(source_pk) x(pknum int, attname name)
    ;

    select pg_catalog.format
    ( $sql$
    create table %I.%I
    ( embedding_uuid uuid not null primary key default pg_catalog.gen_random_uuid()
    , %s
    , chunk_seq int not null
    , chunk text not null
    , embedding @extschema:vector@.vector(%L) storage main not null
    , unique (%s, chunk_seq)
    , foreign key (%s) references %I.%I (%s) on delete cascade
    )
    $sql$
    , target_schema, target_table
    , (
        select pg_catalog.string_agg
        (
            pg_catalog.format
            ( '%I %s not null'
            , x.attname
            , x.typname
            )
            , E'\n, '
            order by x.attnum
        )
        from pg_catalog.jsonb_to_recordset(source_pk)
            x(attnum int, attname name, typname name)
      )
    , dimensions
    , _pk_cols
    , _pk_cols
    , source_schema, source_table
    , _pk_cols
    ) into strict _sql
    ;
    execute _sql;

    if grant_to is not null then
        -- grant usage on target schema to grant_to roles
        select pg_catalog.format
        ( $sql$grant usage on schema %I to %s$sql$
        , target_schema
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;

        -- grant select, insert, update on target table to grant_to roles
        select pg_catalog.format
        ( $sql$grant select, insert, update on %I.%I to %s$sql$
        , target_schema
        , target_table
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;
    end if;
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_create_view
create or replace function ai._vectorizer_create_view
( view_schema name
, view_name name
, source_schema name
, source_table name
, source_pk jsonb
, target_schema name
, target_table name
, grant_to name[]
) returns void as
$func$
declare
    _sql text;
begin
    select pg_catalog.format
    ( $sql$
    create view %I.%I as
    select
      t.embedding_uuid
    , t.chunk_seq
    , t.chunk
    , t.embedding
    , %s
    from %I.%I t
    left outer join %I.%I s
    on (%s)
    $sql$
    , view_schema, view_name
    , (
        -- take primary keys from the target table and other columns from source
        -- this allows for join removal optimization
        select pg_catalog.string_agg
        (
            pg_catalog.format
            ( '%s.%I'
            , case when x.attnum is not null then 't' else 's' end
            , a.attname
            )
            , E'\n    , '
            order by a.attnum
        )
        from pg_catalog.pg_attribute a
        left outer join pg_catalog.jsonb_to_recordset(source_pk) x(attnum int) on (a.attnum operator(pg_catalog.=) x.attnum)
        where a.attrelid operator(pg_catalog.=) pg_catalog.format('%I.%I', source_schema, source_table)::regclass::oid
        and a.attnum operator(pg_catalog.>) 0
        and not a.attisdropped
      )
    , target_schema, target_table
    , source_schema, source_table
    , (
        select pg_catalog.string_agg
        (
            pg_catalog.format
            ( 't.%s = s.%s'
            , x.attname
            , x.attname
            )
            , ' and '
            order by x.pknum
        )
        from pg_catalog.jsonb_to_recordset(source_pk)
            x(pknum int, attname name)
      )
    ) into strict _sql;
    execute _sql;

    if grant_to is not null then
        -- grant usage on view schema to grant_to roles
        select pg_catalog.format
        ( $sql$grant usage on schema %I to %s$sql$
        , view_schema
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;

        -- grant select on view to grant_to roles
        select pg_catalog.format
        ( $sql$grant select on %I.%I to %s$sql$
        , view_schema
        , view_name
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;
    end if;
end
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_create_dependencies
create or replace function ai._vectorizer_create_dependencies(vectorizer_id pg_catalog.int4)
returns void as
$func$
declare
    _vec ai.vectorizer%rowtype;
    _is_owner pg_catalog.bool;
begin
    -- this function is security definer since we need to insert into a catalog table
    -- fully-qualify everything and be careful of security holes

    -- we don't want to run this function on arbitrary tables, so we don't take
    -- schema/table names as parameters. we take a vectorizer id and look it up
    -- preventing this function from being abused
    select v.* into strict _vec
    from ai.vectorizer v
    where v.id operator(pg_catalog.=) vectorizer_id
    ;

    -- don't let anyone but the owner of the source table call this
    select k.relowner operator(pg_catalog.=) pg_catalog.session_user()::pg_catalog.regrole
    into strict _is_owner
    from pg_catalog.pg_class k
    inner join pg_catalog.pg_namespace n on (k.relnamespace operator(pg_catalog.=) n.oid)
    where k.oid operator(pg_catalog.=) pg_catalog.format('%I.%I', _vec.source_schema, _vec.source_table)::pg_catalog.regclass::pg_catalog.oid
    ;
    if not _is_owner then
        raise exception 'only the owner of the source table may call ai._vectorizer_create_dependencies';
    end if;

    -- if we drop the source or the target with `cascade` it should drop the queue
    -- if we drop the source with `cascade` it should drop the target
    -- there's no unique constraint on pg_depend so we manually prevent duplicate entries
    with x as
    (
        -- the queue table depends on the source table
        select
         (select oid from pg_catalog.pg_class where relname operator(pg_catalog.=) 'pg_class') as classid
        , pg_catalog.format('%I.%I', _vec.queue_schema, _vec.queue_table)::pg_catalog.regclass::pg_catalog.oid as objid
        , 0 as objsubid
        , (select oid from pg_catalog.pg_class where relname operator(pg_catalog.=) 'pg_class') as refclassid
        , pg_catalog.format('%I.%I', _vec.source_schema, _vec.source_table)::pg_catalog.regclass::pg_catalog.oid as refobjid
        , 0 as refobjsubid
        , 'n' as deptype
        union all
        -- the queue table depends on the target table
        select
         (select oid from pg_catalog.pg_class where relname operator(pg_catalog.=) 'pg_class') as classid
        , pg_catalog.format('%I.%I', _vec.queue_schema, _vec.queue_table)::pg_catalog.regclass::pg_catalog.oid as objid
        , 0 as objsubid
        , (select oid from pg_catalog.pg_class where relname operator(pg_catalog.=) 'pg_class') as refclassid
        , pg_catalog.format('%I.%I', _vec.target_schema, _vec.target_table)::pg_catalog.regclass::pg_catalog.oid as refobjid
        , 0 as refobjsubid
        , 'n' as deptype
        union all
        -- the target table depends on the source table
        select
         (select oid from pg_catalog.pg_class where relname operator(pg_catalog.=) 'pg_class') as classid
        , pg_catalog.format('%I.%I', _vec.target_schema, _vec.target_table)::pg_catalog.regclass::pg_catalog.oid as objid
        , 0 as objsubid
        , (select oid from pg_catalog.pg_class where relname operator(pg_catalog.=) 'pg_class') as refclassid
        , pg_catalog.format('%I.%I', _vec.source_schema, _vec.source_table)::pg_catalog.regclass::pg_catalog.oid as refobjid
        , 0 as refobjsubid
        , 'n' as deptype
    )
    insert into pg_catalog.pg_depend
    ( classid
    , objid
    , objsubid
    , refclassid
    , refobjid
    , refobjsubid
    , deptype
    )
    select
      x.classid
    , x.objid
    , x.objsubid
    , x.refclassid
    , x.refobjid
    , x.refobjsubid
    , x.deptype
    from x
    where not exists
    (
        select 1
        from pg_catalog.pg_depend d
        where d.classid operator(pg_catalog.=) x.classid
        and d.objid operator(pg_catalog.=) x.objid
        and d.objsubid operator(pg_catalog.=) x.objsubid
        and d.refclassid operator(pg_catalog.=) x.refclassid
        and d.refobjid operator(pg_catalog.=) x.refobjid
        and d.refobjsubid operator(pg_catalog.=) x.refobjsubid
        and d.deptype operator(pg_catalog.=) x.deptype
    )
    ;
end
$func$
language plpgsql volatile security definer -- definer on purpose
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_create_queue_table
create or replace function ai._vectorizer_create_queue_table
( queue_schema name
, queue_table name
, source_pk jsonb
, grant_to name[]
) returns void as
$func$
declare
    _sql text;
begin
    -- create the table
    select pg_catalog.format
    ( $sql$create table %I.%I(%s, queued_at timestamptz not null default now())$sql$
    , queue_schema, queue_table
    , (
        select pg_catalog.string_agg
        (
          pg_catalog.format
          ( '%I %s not null'
          , x.attname
          , x.typname
          )
          , E'\n, '
          order by x.attnum
        )
        from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name, typname name)
      )
    ) into strict _sql
    ;
    execute _sql;

    -- create the index
    select pg_catalog.format
    ( $sql$create index on %I.%I (%s)$sql$
    , queue_schema, queue_table
    , (
        select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.pknum)
        from pg_catalog.jsonb_to_recordset(source_pk) x(pknum int, attname name)
      )
    ) into strict _sql
    ;
    execute _sql;

    if grant_to is not null then
        -- grant usage on queue schema to grant_to roles
        select pg_catalog.format
        ( $sql$grant usage on schema %I to %s$sql$
        , queue_schema
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;

        -- grant select, update, delete on queue table to grant_to roles
        select pg_catalog.format
        ( $sql$grant select, insert, update, delete on %I.%I to %s$sql$
        , queue_schema
        , queue_table
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;
    end if;
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_create_source_trigger
create or replace function ai._vectorizer_create_source_trigger
( trigger_name name
, queue_schema name
, queue_table name
, source_schema name
, source_table name
, source_pk jsonb
) returns void as
$func$
declare
    _sql text;
begin
    -- create the trigger function
    -- the trigger function is security definer
    -- the owner of the source table is creating the trigger function
    -- so the trigger function is run as the owner of the source table
    -- who also owns the queue table
    -- this means anyone with insert/update on the source is able
    -- to enqueue rows in the queue table automatically
    -- since the trigger function only does inserts, this should be safe
    select pg_catalog.format
    ( $sql$
    create function %I.%I() returns trigger
    as $plpgsql$
    begin
        insert into %I.%I (%s)
        values (%s);
        return null;
    end;
    $plpgsql$ language plpgsql volatile parallel safe security definer
    set search_path to pg_catalog, pg_temp
    $sql$
    , queue_schema, trigger_name
    , queue_schema, queue_table
    , (
        select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.attnum)
        from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name)
      )
    , (
        select pg_catalog.string_agg(pg_catalog.format('new.%I', x.attname), ', ' order by x.attnum)
        from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name)
      )
    ) into strict _sql
    ;
    execute _sql;

    -- revoke all on trigger function from public
    select pg_catalog.format
    ( $sql$
    revoke all on function %I.%I() from public
    $sql$
    , queue_schema, trigger_name
    ) into strict _sql
    ;
    execute _sql;

    -- create the trigger on the source table
    select pg_catalog.format
    ( $sql$
    create trigger %I
    after insert or update
    on %I.%I
    for each row execute function %I.%I();
    $sql$
    , trigger_name
    , source_schema, source_table
    , queue_schema, trigger_name
    ) into strict _sql
    ;
    execute _sql;
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_vector_index_exists
create or replace function ai._vectorizer_vector_index_exists
( target_schema name
, target_table name
, indexing jsonb
) returns bool as
$func$
declare
    _implementation text;
    _found bool;
begin
    _implementation = pg_catalog.jsonb_extract_path_text(indexing, 'implementation');
    if _implementation not in ('diskann', 'hnsw') then
        raise exception 'unrecognized index implementation: %s', _implementation;
    end if;

    -- look for an index on the target table where the indexed column is the "embedding" column
    -- and the index is using the correct implementation
    select pg_catalog.count(*) filter
    ( where pg_catalog.pg_get_indexdef(i.indexrelid)
      ilike pg_catalog.concat('% using ', _implementation, ' %')
    ) > 0 into _found
    from pg_catalog.pg_class k
    inner join pg_catalog.pg_namespace n on (k.relnamespace operator(pg_catalog.=) n.oid)
    inner join pg_index i on (k.oid operator(pg_catalog.=) i.indrelid)
    inner join pg_catalog.pg_attribute a
        on (k.oid operator(pg_catalog.=) a.attrelid
        and a.attname operator(pg_catalog.=) 'embedding'
        and a.attnum operator(pg_catalog.=) i.indkey[0]
        )
    where n.nspname operator(pg_catalog.=) target_schema
    and k.relname operator(pg_catalog.=) target_table
    ;
    return coalesce(_found, false);
end
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_should_create_vector_index
create or replace function ai._vectorizer_should_create_vector_index(vectorizer ai.vectorizer) returns boolean
as $func$
declare
    _indexing jsonb;
    _implementation text;
    _create_when_queue_empty bool;
    _sql text;
    _count bigint;
    _min_rows bigint;
begin
    -- grab the indexing config
    _indexing = pg_catalog.jsonb_extract_path(vectorizer.config, 'indexing');
    if _indexing is null then
        return false;
    end if;

    -- grab the indexing config's implementation
    _implementation = pg_catalog.jsonb_extract_path_text(_indexing, 'implementation');
    -- if implementation is missing or none, exit
    if _implementation is null or _implementation = 'none' then
        return false;
    end if;

    -- see if the index already exists. if so, exit
    if ai._vectorizer_vector_index_exists(vectorizer.target_schema, vectorizer.target_table, _indexing) then
        return false;
    end if;

    -- if flag set, only attempt to create the vector index if the queue table is empty
    _create_when_queue_empty = coalesce(pg_catalog.jsonb_extract_path(_indexing, 'create_when_queue_empty')::boolean, true);
    if _create_when_queue_empty then
        -- count the rows in the queue table
        select pg_catalog.format
        ( $sql$select pg_catalog.count(1) from %I.%I limit 1$sql$
        , vectorizer.queue_schema
        , vectorizer.queue_table
        ) into strict _sql
        ;
        execute _sql into _count;
        if _count operator(pg_catalog.>) 0 then
            raise notice 'queue for %.% is not empty. skipping vector index creation', vectorizer.target_schema, vectorizer.target_table;
            return false;
        end if;
    end if;

    -- if min_rows has a value
    _min_rows = coalesce(pg_catalog.jsonb_extract_path_text(_indexing, 'min_rows')::bigint, 0);
    if _min_rows > 0 then
        -- count the rows in the target table
        select pg_catalog.format
        ( $sql$select pg_catalog.count(*) from (select 1 from %I.%I limit %L) x$sql$
        , vectorizer.target_schema
        , vectorizer.target_table
        , _min_rows
        ) into strict _sql
        ;
        execute _sql into _count;
    end if;

    -- if we have met or exceeded min_rows, create the index
    return coalesce(_count, 0) >= _min_rows;
end
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_create_vector_index
create or replace function ai._vectorizer_create_vector_index
( target_schema name
, target_table name
, indexing jsonb
) returns void as
$func$
declare
    _key1 int = 1982010642;
    _key2 int;
    _implementation text;
    _with_count bigint;
    _with text;
    _ext_schema name;
    _sql text;
begin

    -- use the target table's oid as the second key for the advisory lock
    select k.oid::int into strict _key2
    from pg_catalog.pg_class k
    inner join pg_catalog.pg_namespace n on (k.relnamespace operator(pg_catalog.=) n.oid)
    where k.relname operator(pg_catalog.=) target_table
    and n.nspname operator(pg_catalog.=) target_schema
    ;

    -- try to grab a transaction-level advisory lock specific to the target table
    -- if we get it, no one else is building the vector index. proceed
    -- if we don't get it, someone else is already working on it. abort
    if not pg_catalog.pg_try_advisory_xact_lock(_key1, _key2) then
        raise warning 'another process is already building a vector index on %.%', target_schema, target_table;
        return;
    end if;

    -- double-check that the index doesn't exist now that we're holding the advisory lock
    -- nobody likes redundant indexes
    if ai._vectorizer_vector_index_exists(target_table, target_schema, indexing) then
        raise notice 'the vector index on %.% already exists', target_schema, target_table;
        return;
    end if;

    _implementation = pg_catalog.jsonb_extract_path_text(indexing, 'implementation');
    case _implementation
        when 'diskann' then
            select
              pg_catalog.count(*)
            , pg_catalog.string_agg
              ( case w.key
                  when 'storage_layout' then pg_catalog.format('%s=%L', w.key, w.value)
                  when 'max_alpha' then pg_catalog.format('%s=%s', w.key, w.value::float8)
                  else pg_catalog.format('%s=%s', w.key, w.value::int)
                end
              , ', '
              )
            into strict
              _with_count
            , _with
            from pg_catalog.jsonb_each_text(indexing) w
            where w.key in
            ( 'storage_layout'
            , 'num_neighbors'
            , 'search_list_size'
            , 'max_alpha'
            , 'num_dimensions'
            , 'num_bits_per_dimension'
            )
            ;

            select pg_catalog.format
            ( $sql$create index on %I.%I using diskann (embedding)%s$sql$
            , target_schema, target_table
            , case when _with_count operator(pg_catalog.>) 0
                then pg_catalog.format(' with (%s)', _with)
                else ''
              end
            ) into strict _sql;
            execute _sql;
        when 'hnsw' then
            select
              pg_catalog.count(*)
            , pg_catalog.string_agg(pg_catalog.format('%s=%s', w.key, w.value::int), ', ')
            into strict
              _with_count
            , _with
            from pg_catalog.jsonb_each_text(indexing) w
            where w.key in ('m', 'ef_construction')
            ;

            select n.nspname into strict _ext_schema
            from pg_catalog.pg_extension x
            inner join pg_catalog.pg_namespace n on (x.extnamespace operator(pg_catalog.=) n.oid)
            where x.extname operator(pg_catalog.=) 'vector'
            ;

            select pg_catalog.format
            ( $sql$create index on %I.%I using hnsw (embedding %I.%s)%s$sql$
            , target_schema, target_table
            , _ext_schema
            , indexing operator(pg_catalog.->>) 'opclass'
            , case when _with_count operator(pg_catalog.>) 0
                then pg_catalog.format(' with (%s)', _with)
                else ''
              end
            ) into strict _sql;
            execute _sql;
        else
            raise exception 'unrecognized index implementation: %s', _implementation;
    end case;
end
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_job
create or replace procedure ai._vectorizer_job
( job_id int default null
, config jsonb default null
) as
$func$
declare
    _vectorizer_id int;
    _vec ai.vectorizer%rowtype;
    _sql text;
    _found bool;
    _count bigint;
begin
    set local search_path = pg_catalog, pg_temp;
    if config is null then
        raise exception 'config is null';
    end if;

    -- get the vectorizer id from the config
    select pg_catalog.jsonb_extract_path_text(config, 'vectorizer_id')::int
    into strict _vectorizer_id
    ;

    -- get the vectorizer
    select * into strict _vec
    from ai.vectorizer v
    where v.id operator(pg_catalog.=) _vectorizer_id
    ;

    commit;
    set local search_path = pg_catalog, pg_temp;

    -- if the conditions are right, create the vectorizer index
    if ai._vectorizer_should_create_vector_index(_vec) then
        commit;
        set local search_path = pg_catalog, pg_temp;
        perform ai._vectorizer_create_vector_index
        (_vec.target_schema
        , _vec.target_table
        , pg_catalog.jsonb_extract_path(_vec.config, 'indexing')
        );
    end if;

    commit;
    set local search_path = pg_catalog, pg_temp;

    -- if there is at least one item in the queue, we need to execute the vectorizer
    select pg_catalog.format
    ( $sql$
    select true
    from %I.%I
    for update skip locked
    limit 1
    $sql$
    , _vec.queue_schema, _vec.queue_table
    ) into strict _sql
    ;
    execute _sql into _found;
    commit;
    set local search_path = pg_catalog, pg_temp;
    if coalesce(_found, false) is true then
        -- count total items in the queue
        select pg_catalog.format
        ( $sql$select pg_catalog.count(1) from (select 1 from %I.%I limit 501) $sql$
        , _vec.queue_schema, _vec.queue_table
        ) into strict _sql
        ;
        execute _sql into strict _count;
        commit;
        set local search_path = pg_catalog, pg_temp;
        -- for every 50 items in the queue, execute a vectorizer max out at 10 vectorizers
        _count = least(pg_catalog.ceil(_count::float8 / 50.0::float8), 10::float8)::bigint;
        raise debug 'job_id %: executing % vectorizers...', job_id, _count;
        while _count > 0 loop
            -- execute the vectorizer
            perform ai.execute_vectorizer(_vectorizer_id);
            _count = _count - 1;
        end loop;
    end if;
    commit;
    set local search_path = pg_catalog, pg_temp;
end
$func$
language plpgsql security invoker
;

-------------------------------------------------------------------------------
-- _vectorizer_schedule_job
create or replace function ai._vectorizer_schedule_job
( vectorizer_id int
, scheduling jsonb
) returns bigint as
$func$
declare
    _implementation text;
    _sql text;
    _extension_schema name;
    _job_id bigint;
begin
    select pg_catalog.jsonb_extract_path_text(scheduling, 'implementation')
    into strict _implementation
    ;
    case
        when _implementation = 'timescaledb' then
            -- look up schema/name of the extension for scheduling. may be null
            select n.nspname into _extension_schema
            from pg_catalog.pg_extension x
            inner join pg_catalog.pg_namespace n on (x.extnamespace operator(pg_catalog.=) n.oid)
            where x.extname operator(pg_catalog.=) _implementation
            ;
            if _extension_schema is null then
                raise exception 'timescaledb extension not found';
            end if;
        when _implementation = 'none' then
            return null;
        else
            raise exception 'scheduling implementation not recognized';
    end case;

    -- schedule the job using the implementation chosen
    case _implementation
        when 'timescaledb' then
            -- schedule the work proc with timescaledb background jobs
            select pg_catalog.format
            ( $$select %I.add_job('ai._vectorizer_job'::regproc, %s, config=>%L)$$
            , _extension_schema
            , ( -- gather up the arguments
                select string_agg
                ( pg_catalog.format('%s=>%L', s.key, s.value)
                , ', '
                order by x.ord
                )
                from pg_catalog.jsonb_each_text(scheduling) s
                inner join
                unnest(array['schedule_interval', 'initial_start', 'fixed_schedule', 'timezone']) with ordinality x(key, ord)
                on (s.key = x.key)
              )
            , pg_catalog.jsonb_build_object('vectorizer_id', vectorizer_id)::text
            ) into strict _sql
            ;
            execute _sql into strict _job_id;
    end case;
    return _job_id;
end
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_handle_drops
create or replace function ai._vectorizer_handle_drops()
returns event_trigger as
$func$
declare
    _id int;
begin
    -- this function is security definer
    -- fully-qualify everything and be careful of security holes
    for _id in
    (
        select distinct v.id
        from pg_catalog.pg_event_trigger_dropped_objects() d
        inner join ai.vectorizer v
        on ((d.schema_name, d.object_name) in
            ( (v.source_schema, v.source_table)
            , (v.target_schema, v.target_table)
            , (v.queue_schema, v.queue_table)
            )
        )
        where pg_catalog.lower(d.object_type) operator(pg_catalog.=) 'table'
    )
    loop
        -- this may cause recursive invocations of this event trigger
        -- however it does not cause a problem
        raise notice 'associated table for vectorizer % dropped. dropping vectorizer', _id;
        perform ai.drop_vectorizer(_id);
    end loop;
end;
$func$
language plpgsql volatile security definer -- definer on purpose!
set search_path to pg_catalog, pg_temp
;

-- install the event trigger if not exists
do language plpgsql $block$
begin
    -- if the event trigger already exists, noop
    perform
    from pg_catalog.pg_event_trigger g
    where g.evtname operator(pg_catalog.=) '_vectorizer_handle_drops'
    and g.evtfoid operator(pg_catalog.=) pg_catalog.to_regproc('ai._vectorizer_handle_drops')
    ;
    if found then
        return;
    end if;

    create event trigger _vectorizer_handle_drops
    on sql_drop
    execute function ai._vectorizer_handle_drops();
end
$block$;


--------------------------------------------------------------------------------
-- 013-vectorizer-api.sql


-------------------------------------------------------------------------------
-- execute_vectorizer
create or replace function ai.execute_vectorizer(vectorizer_id int) returns void
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.vectorizer
    ai.vectorizer.execute_vectorizer(plpy, vectorizer_id)
$python$
language plpython3u volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- create_vectorizer
create or replace function ai.create_vectorizer
( source regclass
, destination name default null
, embedding jsonb default null
, chunking jsonb default null
, indexing jsonb default ai.indexing_default()
, formatting jsonb default ai.formatting_python_template()
, scheduling jsonb default ai.scheduling_default()
, processing jsonb default ai.processing_default()
, target_schema name default null
, target_table name default null
, view_schema name default null
, view_name name default null
, queue_schema name default null
, queue_table name default null
, grant_to name[] default ai.grant_to()
, enqueue_existing bool default true
) returns int
as $func$
declare
    _missing_roles name[];
    _source_table name;
    _source_schema name;
    _trigger_name name;
    _is_owner bool;
    _dimensions int;
    _source_pk jsonb;
    _vectorizer_id int;
    _sql text;
    _job_id bigint;
begin
    -- make sure all the roles listed in grant_to exist
    if grant_to is not null then
        select
          pg_catalog.array_agg(r) filter (where pg_catalog.to_regrole(r) is null) -- missing
        , pg_catalog.array_agg(r) filter (where pg_catalog.to_regrole(r) is not null) -- real roles
        into strict
          _missing_roles
        , grant_to
        from pg_catalog.unnest(grant_to) r
        ;
        if pg_catalog.array_length(_missing_roles, 1) > 0 then
            raise warning 'one or more grant_to roles do not exist: %', _missing_roles;
        end if;
    end if;
    
    if embedding is null then
        raise exception 'embedding configuration is required';
    end if;
    
    if chunking is null then
        raise exception 'chunking configuration is required';
    end if;

    -- get source table name and schema name
    select k.relname, n.nspname, k.relowner operator(pg_catalog.=) current_user::regrole
    into strict _source_table, _source_schema, _is_owner
    from pg_catalog.pg_class k
    inner join pg_catalog.pg_namespace n on (k.relnamespace operator(pg_catalog.=) n.oid)
    where k.oid operator(pg_catalog.=) source
    ;

    -- TODO: consider allowing (in)direct members of the role that owns the source table
    if not _is_owner then
        raise exception 'only the owner of the source table may create a vectorizer on it';
    end if;

    select (embedding operator(pg_catalog.->) 'dimensions')::int into _dimensions;
    if _dimensions is null then
        raise exception '_dimensions argument is required';
    end if;

    -- get the source table's primary key definition
    select ai._vectorizer_source_pk(source) into strict _source_pk;
    if _source_pk is null or pg_catalog.jsonb_array_length(_source_pk) = 0 then
        raise exception 'source table must have a primary key constraint';
    end if;

    _vectorizer_id = pg_catalog.nextval('ai.vectorizer_id_seq'::pg_catalog.regclass);
    target_schema = coalesce(target_schema, _source_schema);
    target_table = case
        when target_table is not null then target_table
        when destination is not null then pg_catalog.concat(destination, '_store')
        else pg_catalog.concat(_source_table, '_embedding_store')
    end;
    view_schema = coalesce(view_schema, _source_schema);
    view_name = case
        when view_name is not null then view_name
        when destination is not null then destination
        else pg_catalog.concat(_source_table, '_embedding')
    end;
    _trigger_name = pg_catalog.concat('_vectorizer_src_trg_', _vectorizer_id);
    queue_schema = coalesce(queue_schema, 'ai');
    queue_table = coalesce(queue_table, pg_catalog.concat('_vectorizer_q_', _vectorizer_id));

    -- make sure view name is available
    if pg_catalog.to_regclass(pg_catalog.format('%I.%I', view_schema, view_name)) is not null then
        raise exception 'an object named %.% already exists. specify an alternate destination explicitly', view_schema, view_name;
    end if;

    -- make sure target table name is available
    if pg_catalog.to_regclass(pg_catalog.format('%I.%I', target_schema, target_table)) is not null then
        raise exception 'an object named %.% already exists. specify an alternate destination or target_table explicitly', target_schema, target_table;
    end if;

    -- make sure queue table name is available
    if pg_catalog.to_regclass(pg_catalog.format('%I.%I', queue_schema, queue_table)) is not null then
        raise exception 'an object named %.% already exists. specify an alternate queue_table explicitly', queue_schema, queue_table;
    end if;

    -- validate the embedding config
    perform ai._validate_embedding(embedding);

    -- validate the chunking config
    perform ai._validate_chunking(chunking, _source_schema, _source_table);

    -- if ai.indexing_default, resolve the default
    if indexing operator(pg_catalog.->>) 'implementation' = 'default' then
        indexing = ai._resolve_indexing_default();
    end if;

    -- validate the indexing config
    perform ai._validate_indexing(indexing);

    -- validate the formatting config
    perform ai._validate_formatting(formatting, _source_schema, _source_table);

    -- if ai.scheduling_default, resolve the default
    if scheduling operator(pg_catalog.->>) 'implementation' = 'default' then
        scheduling = ai._resolve_scheduling_default();
    end if;

    -- validate the scheduling config
    perform ai._validate_scheduling(scheduling);

    -- validate the processing config
    perform ai._validate_processing(processing);

    -- if scheduling is none then indexing must also be none
    if scheduling operator(pg_catalog.->>) 'implementation' = 'none'
    and indexing operator(pg_catalog.->>) 'implementation' != 'none' then
        raise exception 'automatic indexing is not supported without scheduling. set indexing=>ai.indexing_none() when scheduling=>ai.scheduling_none()';
    end if;

    -- grant select to source table
    perform ai._vectorizer_grant_to_source
    ( _source_schema
    , _source_table
    , grant_to
    );

    -- create the target table
    perform ai._vectorizer_create_target_table
    ( _source_schema
    , _source_table
    , _source_pk
    , target_schema
    , target_table
    , _dimensions
    , grant_to
    );

    -- create queue table
    perform ai._vectorizer_create_queue_table
    ( queue_schema
    , queue_table
    , _source_pk
    , grant_to
    );

    -- create trigger on source table to populate queue
    perform ai._vectorizer_create_source_trigger
    ( _trigger_name
    , queue_schema
    , queue_table
    , _source_schema
    , _source_table
    , _source_pk
    );

    -- create view
    perform ai._vectorizer_create_view
    ( view_schema
    , view_name
    , _source_schema
    , _source_table
    , _source_pk
    , target_schema
    , target_table
    , grant_to
    );

    -- schedule the async ext job
    select ai._vectorizer_schedule_job
    (_vectorizer_id
    , scheduling
    ) into _job_id
    ;
    if _job_id is not null then
        scheduling = pg_catalog.jsonb_insert(scheduling, array['job_id'], to_jsonb(_job_id));
    end if;

    insert into ai.vectorizer
    ( id
    , source_schema
    , source_table
    , source_pk
    , target_schema
    , target_table
    , view_schema
    , view_name
    , trigger_name
    , queue_schema
    , queue_table
    , config
    )
    values
    ( _vectorizer_id
    , _source_schema
    , _source_table
    , _source_pk
    , target_schema
    , target_table
    , view_schema
    , view_name
    , _trigger_name
    , queue_schema
    , queue_table
    , pg_catalog.jsonb_build_object
      ( 'version', '0.4.1'
      , 'embedding', embedding
      , 'chunking', chunking
      , 'indexing', indexing
      , 'formatting', formatting
      , 'scheduling', scheduling
      , 'processing', processing
      )
    );

    -- record dependencies in pg_depend
    perform ai._vectorizer_create_dependencies(_vectorizer_id);

    -- grant select on the vectorizer table
    perform ai._vectorizer_grant_to_vectorizer(grant_to);

    -- insert into queue any existing rows from source table
    if enqueue_existing is true then
        select pg_catalog.format
        ( $sql$
        insert into %I.%I (%s)
        select %s
        from %I.%I x
        ;
        $sql$
        , queue_schema, queue_table
        , (
            select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.attnum)
            from pg_catalog.jsonb_to_recordset(_source_pk) x(attnum int, attname name)
          )
        , (
            select pg_catalog.string_agg(pg_catalog.format('x.%I', x.attname), ', ' order by x.attnum)
            from pg_catalog.jsonb_to_recordset(_source_pk) x(attnum int, attname name)
          )
        , _source_schema, _source_table
        ) into strict _sql
        ;
        execute _sql;
    end if;
    return _vectorizer_id;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- disable_vectorizer_schedule
create or replace function ai.disable_vectorizer_schedule(vectorizer_id int) returns void
as $func$
declare
    _vec ai.vectorizer%rowtype;
    _schedule jsonb;
    _job_id bigint;
    _sql text;
begin
    select * into strict _vec
    from ai.vectorizer v
    where v.id operator(pg_catalog.=) vectorizer_id
    ;
    -- enable the scheduled job if exists
    _schedule = _vec.config operator(pg_catalog.->) 'scheduling';
    if _schedule is not null then
        case _schedule operator(pg_catalog.->>) 'implementation'
            when 'none' then -- ok
            when 'timescaledb' then
                _job_id = (_schedule operator(pg_catalog.->) 'job_id')::bigint;
                select pg_catalog.format
                ( $$select %I.alter_job(job_id, scheduled=>false) from timescaledb_information.jobs where job_id = %L$$
                , n.nspname
                , _job_id
                ) into _sql
                from pg_catalog.pg_extension x
                inner join pg_catalog.pg_namespace n on (x.extnamespace = n.oid)
                where x.extname = 'timescaledb'
                ;
                if _sql is not null then
                    execute _sql;
                end if;
        end case;
    end if;
end;
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- enable_vectorizer_schedule
create or replace function ai.enable_vectorizer_schedule(vectorizer_id int) returns void
as $func$
declare
    _vec ai.vectorizer%rowtype;
    _schedule jsonb;
    _job_id bigint;
    _sql text;
begin
    select * into strict _vec
    from ai.vectorizer v
    where v.id operator(pg_catalog.=) vectorizer_id
    ;
    -- enable the scheduled job if exists
    _schedule = _vec.config operator(pg_catalog.->) 'scheduling';
    if _schedule is not null then
        case _schedule operator(pg_catalog.->>) 'implementation'
            when 'none' then -- ok
            when 'timescaledb' then
                _job_id = (_schedule operator(pg_catalog.->) 'job_id')::bigint;
                select pg_catalog.format
                ( $$select %I.alter_job(job_id, scheduled=>true) from timescaledb_information.jobs where job_id = %L$$
                , n.nspname
                , _job_id
                ) into _sql
                from pg_catalog.pg_extension x
                inner join pg_catalog.pg_namespace n on (x.extnamespace = n.oid)
                where x.extname = 'timescaledb'
                ;
                if _sql is not null then
                    execute _sql;
                end if;
        end case;
    end if;
end;
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- drop_vectorizer
create or replace function ai.drop_vectorizer
( vectorizer_id int
, drop_all boolean default false
) returns void
as $func$
/* drop_vectorizer
This function does the following:
1. deletes the scheduled job if any
2. drops the trigger from the source table
3. drops the trigger function
4. drops the queue table
5. deletes the vectorizer row

UNLESS drop_all = true, it does NOT:
1. drop the target table containing the embeddings
2. drop the view joining the target and source
*/
declare
    _vec ai.vectorizer%rowtype;
    _schedule jsonb;
    _job_id bigint;
    _trigger pg_catalog.pg_trigger%rowtype;
    _sql text;
begin
    ---------------------------------------------------------------------------
    -- NOTE: this function is security invoker BUT it is called from an
    -- event trigger that is security definer.
    -- This function needs to STAY security invoker, but we need to treat
    -- it as if it were security definer as far as observing security
    -- best practices
    ---------------------------------------------------------------------------

    -- grab the vectorizer we need to drop
    select v.* into strict _vec
    from ai.vectorizer v
    where v.id operator(pg_catalog.=) vectorizer_id
    ;

    -- delete the scheduled job if exists
    _schedule = _vec.config operator(pg_catalog.->) 'scheduling';
    if _schedule is not null then
        case _schedule operator(pg_catalog.->>) 'implementation'
            when 'none' then -- ok
            when 'timescaledb' then
                _job_id = (_schedule operator(pg_catalog.->) 'job_id')::bigint;
                select pg_catalog.format
                ( $$select %I.delete_job(job_id) from timescaledb_information.jobs where job_id = %L$$
                , n.nspname
                , _job_id
                ) into _sql
                from pg_catalog.pg_extension x
                inner join pg_catalog.pg_namespace n on (x.extnamespace operator(pg_catalog.=) n.oid)
                where x.extname operator(pg_catalog.=) 'timescaledb'
                ;
                if found then
                    execute _sql;
                end if;
        end case;
    end if;

    -- try to look up the trigger so we can find the function/procedure backing the trigger
    select * into _trigger
    from pg_catalog.pg_trigger g
    inner join pg_catalog.pg_class k
    on (g.tgrelid operator(pg_catalog.=) k.oid
    and k.relname operator(pg_catalog.=) _vec.source_table)
    inner join pg_catalog.pg_namespace n
    on (k.relnamespace operator(pg_catalog.=) n.oid
    and n.nspname operator(pg_catalog.=) _vec.source_schema)
    where g.tgname operator(pg_catalog.=) _vec.trigger_name
    ;

    -- drop the trigger on the source table
    if found then
        select pg_catalog.format
        ( $sql$drop trigger %I on %I.%I$sql$
        , _trigger.tgname
        , _vec.source_schema
        , _vec.source_table
        ) into strict _sql
        ;
        execute _sql;

        -- drop the function/procedure backing the trigger
        select pg_catalog.format
        ( $sql$drop %s %I.%I()$sql$
        , case p.prokind when 'f' then 'function' when 'p' then 'procedure' end
        , n.nspname
        , p.proname
        ) into _sql
        from pg_catalog.pg_proc p
        inner join pg_catalog.pg_namespace n on (n.oid operator(pg_catalog.=) p.pronamespace)
        where p.oid operator(pg_catalog.=) _trigger.tgfoid
        ;
        if found then
            execute _sql;
        end if;
    else
        -- the trigger is missing. try to find the backing function by name and return type
        select pg_catalog.format
        ( $sql$drop %s %I.%I() cascade$sql$ -- cascade in case the trigger still exists somehow
        , case p.prokind when 'f' then 'function' when 'p' then 'procedure' end
        , n.nspname
        , p.proname
        ) into _sql
        from pg_catalog.pg_proc p
        inner join pg_catalog.pg_namespace n on (n.oid operator(pg_catalog.=) p.pronamespace)
        inner join pg_catalog.pg_type y on (p.prorettype operator(pg_catalog.=) y.oid)
        where n.nspname operator(pg_catalog.=) _vec.queue_schema
        and p.proname operator(pg_catalog.=) _vec.trigger_name
        and y.typname operator(pg_catalog.=) 'trigger'
        ;
        if found then
            execute _sql;
        end if;
    end if;

    -- drop the queue table if exists
    select pg_catalog.format
    ( $sql$drop table if exists %I.%I$sql$
    , _vec.queue_schema
    , _vec.queue_table
    ) into strict _sql;
    execute _sql;

    if drop_all then
        -- drop the view if exists
        select pg_catalog.format
        ( $sql$drop view if exists %I.%I$sql$
        , _vec.view_schema
        , _vec.view_name
        ) into strict _sql;
        execute _sql;

        -- drop the target table if exists
        select pg_catalog.format
        ( $sql$drop table if exists %I.%I$sql$
        , _vec.target_schema
        , _vec.target_table
        ) into strict _sql;
        execute _sql;
    end if;

    -- delete the vectorizer row
    delete from ai.vectorizer v
    where v.id operator(pg_catalog.=) vectorizer_id
    ;

end;
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- vectorizer_queue_pending
create or replace function ai.vectorizer_queue_pending
( vectorizer_id int
, exact_count boolean default false
) returns bigint
as $func$
declare
    _queue_schema name;
    _queue_table name;
    _sql text;
    _queue_depth bigint;
begin
    select v.queue_schema, v.queue_table into _queue_schema, _queue_table
    from ai.vectorizer v
    where v.id operator(pg_catalog.=) vectorizer_id
    ;
    if _queue_schema is null or _queue_table is null then
        raise exception 'vectorizer has no queue table';
    end if;
    if exact_count then
        select format
        ( $sql$select count(1) from %I.%I$sql$
        , _queue_schema, _queue_table
        ) into strict _sql
        ;
        execute _sql into strict _queue_depth;
    else
        select format
        ( $sql$select count(*) from (select 1 from %I.%I limit 10001)$sql$
        , _queue_schema, _queue_table
        ) into strict _sql
        ;
        execute _sql into strict _queue_depth;
        if _queue_depth = 10001 then
            _queue_depth = 9223372036854775807; -- max bigint value
        end if;
    end if;

    return _queue_depth;
end;
$func$ language plpgsql stable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- vectorizer_status
create or replace view ai.vectorizer_status as
select
  v.id
, pg_catalog.format('%I.%I', v.source_schema, v.source_table) as source_table
, pg_catalog.format('%I.%I', v.target_schema, v.target_table) as target_table
, pg_catalog.format('%I.%I', v.view_schema, v.view_name) as "view"
, case when v.queue_table is not null and
    pg_catalog.has_table_privilege
    ( current_user
    , pg_catalog.format('%I.%I', v.queue_schema, v.queue_table)
    , 'select'
    )
    then ai.vectorizer_queue_pending(v.id)
  else null
  end as pending_items
from ai.vectorizer v
;


--------------------------------------------------------------------------------
-- 014-secrets.sql
-------------------------------------------------------------------------------
-- reveal_secret
create or replace function ai.reveal_secret(secret_name text, use_cache boolean default true) returns text
as $python$
    if "ai.version" not in GD:
        r = plpy.execute("select coalesce(pg_catalog.current_setting('ai.python_lib_dir', true), '/usr/local/lib/pgai') as python_lib_dir")
        python_lib_dir = r[0]["python_lib_dir"]
        from pathlib import Path
        python_lib_dir = Path(python_lib_dir).joinpath("0.4.1")
        import site
        site.addsitedir(str(python_lib_dir))
        from ai import __version__ as ai_version
        assert("0.4.1" == ai_version)
        GD["ai.version"] = "0.4.1"
    else:
        if GD["ai.version"] != "0.4.1":
            plpy.fatal("the pgai extension version has changed. start a new session")
    import ai.secrets
    if use_cache:
        return ai.secrets.reveal_secret(plpy, secret_name, SD)
    else:
        ai.secrets.remove_secret_from_cache(SD, secret_name)
        return ai.secrets.reveal_secret(plpy, secret_name, None)
$python$
language plpython3u stable security invoker
set search_path to pg_catalog, pg_temp;

-------------------------------------------------------------------------------
-- secret_permissions
create or replace view ai.secret_permissions as
select *
from ai._secret_permissions
where pg_catalog.to_regrole("role") is not null
      and pg_catalog.pg_has_role(current_user, "role", 'member');

-------------------------------------------------------------------------------
-- grant_secret
create or replace function ai.grant_secret(secret_name text, grant_to_role text) returns void
as $func$
    insert into ai._secret_permissions ("name", "role") values (secret_name, grant_to_role);
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp;

-------------------------------------------------------------------------------
-- revoke_secret
create or replace function ai.revoke_secret(secret_name text, revoke_from_role text) returns void
as $func$
    delete from ai._secret_permissions where "name" = secret_name and "role" = revoke_from_role;
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp;


--------------------------------------------------------------------------------
-- 999-privileges.sql

-------------------------------------------------------------------------------
-- grant_ai_usage
create or replace function ai.grant_ai_usage(to_user name, admin bool default false) returns void
as $func$
declare
    _sql text;
begin
    -- schema
    select pg_catalog.format
    ( 'grant %s on schema ai to %I%s'
    , case when admin then 'all privileges' else 'usage, create' end
    , to_user
    , case when admin then ' with grant option' else '' end
    ) into strict _sql
    ;
    raise debug '%', _sql;
    execute _sql;

    -- tables, sequences, and views
    for _sql in
    (
        select pg_catalog.format
        ( 'grant %s on %s %I.%I to %I%s'
        , case
            when admin then 'all privileges'
            else
                case
                    when k.relkind in ('r', 'p') then 'select, insert, update, delete'
                    when k.relkind in ('S') then 'usage, select, update'
                    when k.relkind in ('v') then 'select'
                end
          end
        , case
            when k.relkind in ('r', 'p') then 'table'
            when k.relkind in ('S') then 'sequence'
            when k.relkind in ('v') then ''
          end
        , n.nspname
        , k.relname
        , to_user
        , case when admin then ' with grant option' else '' end
        )
        from pg_catalog.pg_depend d
        inner join pg_catalog.pg_extension e on (d.refobjid operator(pg_catalog.=) e.oid)
        inner join pg_catalog.pg_class k on (d.objid operator(pg_catalog.=) k.oid)
        inner join pg_namespace n on (k.relnamespace operator(pg_catalog.=) n.oid)
        where d.refclassid operator(pg_catalog.=) 'pg_catalog.pg_extension'::pg_catalog.regclass
        and d.deptype operator(pg_catalog.=) 'e'
        and e.extname operator(pg_catalog.=) 'ai'
        and k.relkind in ('r', 'p', 'S', 'v') -- tables, sequences, and views
        and (admin, n.nspname, k.relname) not in
        ( (false, 'ai', 'migration') -- only admins get any access to this table
        , (false, 'ai', '_secret_permissions') -- only admins get any access to this table
        , (false, 'ai', 'feature_flag') -- only admins get any access to this table
        )
        order by n.nspname, k.relname
    )
    loop
        raise debug '%', _sql;
        execute _sql;
    end loop;

    -- procedures and functions
    for _sql in
    (
        select pg_catalog.format
        ( 'grant %s on %s %I.%I(%s) to %I%s'
        , case when admin then 'all privileges' else 'execute' end
        , case k.prokind
              when 'f' then 'function'
              when 'p' then 'procedure'
          end
        , n.nspname
        , k.proname
        , pg_catalog.pg_get_function_identity_arguments(k.oid)
        , to_user
        , case when admin then ' with grant option' else '' end
        )
        from pg_catalog.pg_depend d
        inner join pg_catalog.pg_extension e on (d.refobjid operator(pg_catalog.=) e.oid)
        inner join pg_catalog.pg_proc k on (d.objid operator(pg_catalog.=) k.oid)
        inner join pg_namespace n on (k.pronamespace operator(pg_catalog.=) n.oid)
        where d.refclassid operator(pg_catalog.=) 'pg_catalog.pg_extension'::pg_catalog.regclass
        and d.deptype operator(pg_catalog.=) 'e'
        and e.extname operator(pg_catalog.=) 'ai'
        and k.prokind in ('f', 'p')
        and case
              when k.proname in ('grant_ai_usage', 'grant_secret', 'revoke_secret') then admin -- only admins get these function
              else true
            end
    )
    loop
        raise debug '%', _sql;
        execute _sql;
    end loop;
    
    -- secret permissions
    if admin then
        -- grant access to all secrets to admin users
        insert into ai._secret_permissions ("name", "role")
        values ('*', to_user)
        on conflict on constraint _secret_permissions_pkey
        do nothing
        ;
    end if;
end
$func$ language plpgsql volatile
security invoker -- gotta have privs to give privs
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- grant admin usage to session user and pg_database_owner
select ai.grant_ai_usage(pg_catalog."session_user"(), admin=>true);
select ai.grant_ai_usage('pg_database_owner', admin=>true);

-------------------------------------------------------------------------------
-- revoke everything from public
do language plpgsql $func$
declare
    _sql text;
begin
    -- schema
    revoke all privileges on schema ai from public;

    -- tables, sequences, and views
    for _sql in
    (
        select pg_catalog.format
        ( 'revoke all privileges on %s %I.%I from public'
        , case
            when k.relkind in ('r', 'p') then 'table'
            when k.relkind in ('S') then 'sequence'
            when k.relkind in ('v') then ''
          end
        , n.nspname
        , k.relname
        )
        from pg_catalog.pg_depend d
        inner join pg_catalog.pg_extension e on (d.refobjid operator(pg_catalog.=) e.oid)
        inner join pg_catalog.pg_class k on (d.objid operator(pg_catalog.=) k.oid)
        inner join pg_namespace n on (k.relnamespace operator(pg_catalog.=) n.oid)
        where d.refclassid operator(pg_catalog.=) 'pg_catalog.pg_extension'::pg_catalog.regclass
        and d.deptype operator(pg_catalog.=) 'e'
        and e.extname operator(pg_catalog.=) 'ai'
        and k.relkind in ('r', 'p', 'S', 'v') -- tables, sequences, and views
        order by n.nspname, k.relname
    )
    loop
        raise debug '%', _sql;
        execute _sql;
    end loop;

    -- procedures and functions
    for _sql in
    (
        select pg_catalog.format
        ( 'revoke all privileges on %s %I.%I(%s) from public'
        , case k.prokind
              when 'f' then 'function'
              when 'p' then 'procedure'
          end
        , n.nspname
        , k.proname
        , pg_catalog.pg_get_function_identity_arguments(k.oid)
        )
        from pg_catalog.pg_depend d
        inner join pg_catalog.pg_extension e on (d.refobjid operator(pg_catalog.=) e.oid)
        inner join pg_catalog.pg_proc k on (d.objid operator(pg_catalog.=) k.oid)
        inner join pg_namespace n on (k.pronamespace operator(pg_catalog.=) n.oid)
        where d.refclassid operator(pg_catalog.=) 'pg_catalog.pg_extension'::pg_catalog.regclass
        and d.deptype operator(pg_catalog.=) 'e'
        and e.extname operator(pg_catalog.=) 'ai'
        and k.prokind in ('f', 'p')
    )
    loop
        raise debug '%', _sql;
        execute _sql;
    end loop;
end
$func$;


